package com.lm.flink.entry;

import org.apache.flink.api.common.functions.AggregateFunction;

public class LogLevelCounter implements AggregateFunction<LogEvent, Long, LogLevelCount> {

    private String level;
    private long windowEnd;

    public LogLevelCounter(String level, long windowEnd) {
        this.level = level;
        this.windowEnd = windowEnd;
    }

    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(LogEvent value, Long accumulator) {
        return accumulator + 1;
    }

    @Override
    public LogLevelCount getResult(Long accumulator) {
        return new LogLevelCount(level, accumulator, windowEnd);
    }

    @Override
    public Long merge(Long a, Long b) {
        return a + b;
    }
}

